package org.ykx.demo.sql

import scala.reflect.runtime.universe
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext

object RddToDataframe {
  sealed case class kqzl(mon: String, city: String, aqi: Int, range: String,
                         quality_level:String,pm2_5:Double,pm10:Double,
                         so2:Double,co:Double,no2:Double,o3:Double,sort_no:Int)
                         
  val conf = new SparkConf().setAppName("TEST_DEMO").setMaster("local[2]")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  
  def getDataFrame:DataFrame={

        // 导入语句，可以隐式地将RDD转化成DataFrame
        import sqlContext.implicits._
        val lines = sc.textFile("E://Src/git_scala/kongqizhiliang.csv")
        val df = lines.map(_.split(",")).map { splited => kqzl(splited(0),splited(1),splited(2).toInt,splited(3),splited(4),splited(5).toDouble,splited(6).toDouble,splited(7).toDouble,splited(8).toDouble,splited(9).toDouble,splited(10).toDouble,splited(11).toInt)
         }.toDF()
   
        df
  }
  
  def main(args: Array[String]): Unit = {
    val df = getDataFrame
    println("***********kongqizhiliang count is "+df.count()+"...****************** ")
//    println("***********[group by]****************** ")
//    df.groupBy("mon").count().foreach { x => println(x)}
    
    df.registerTempTable("kqzl") //注册成临时表
    df.registerTempTable("test")
    val kqzl_sz = sqlContext.sql("select * from kqzl t right join test on t.city=test.city where t.city='深圳'")
    kqzl_sz.foreach { x => println(x) }
    
        
    
  }
}